/*******************************************************************************
 * Package: com.song.bigdata.v2
 * Type:    Test
 * Date:    2024-09-18 17:56
 *
 * Copyright (c) 2024 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.v2;

import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 功能描述：从数据源读取、数据转换、聚合、窗口化
 * 数据源读取
 * env.fromElements(...)
 * env.readTextFile(...)
 * env.addSource(...)
 *
 * @author Songxianyang
 * @date 2024-09-18 17:56
 */
public class Test {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // todo 录入数据 也就是数据源读取
        List<String> name = Lists.newArrayList("song xi gua", "song", "xian gua", "yang", "yang");
        // 创建流
        DataStreamSource<String> streamSource = env.fromCollection(name);
        // 转换流中的数据（map)
        DataStream<String> upperCaseStream = streamSource.map(value -> value.toUpperCase());
        // 打印流
        upperCaseStream.print("map");
        // 转换流中的数据扁平化（)
        SingleOutputStreamOperator<Object> flatMap = streamSource.flatMap(new FlatMapFunction<String, Object>() {

            private static final long serialVersionUID = 7483808125968087428L;

            // 处理里面的数据
            @Override
            public void flatMap(String value, Collector<Object> out) throws Exception {
                for (String data : value.split(" ")) {
                    // 输出
                    out.collect(data);
                }
            }
        });
        flatMap.print("flat");

        // 过滤数据
        DataStream<String> filter = streamSource.filter(value -> value.equals("yang"));
        filter.print("filter");
        // // 分组
        // KeyedStream<String, Object> group = streamSource.keyBy(new KeySelector<String, Object>() {
        //     @Override
        //     public Object getKey(String key) throws Exception {
        //         return key;
        //     }
        //
        //     private static final long serialVersionUID = -6730191295095496510L;
        // });
        // group.print("group");
        // 执行环境
        env.execute();
    }
}
